{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%run startup.py"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "application/javascript": [
       "$.getScript('./assets/js/ipython_notebook_toc.js')"
      ],
      "text/plain": [
       "<IPython.core.display.Javascript object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "%%javascript\n",
    "$.getScript('./assets/js/ipython_notebook_toc.js')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# A Decision Tree of Observable Operators\n",
    "\n",
    "## Part 8: Hot and Cold Observables\n",
    "\n",
    "> source: http://reactivex.io/documentation/operators.html#tree.  \n",
    "> (transcribed to RxPY 1.5.7, Py2.7 / 2016-12, Gunther Klessinger, [axiros](http://www.axiros.com))  \n",
    "\n",
    "**This tree can help you find the ReactiveX Observable operator you’re looking for.**  \n",
    "See [Part 1](./A Decision Tree of Observable Operators. Part I - Creation.ipynb) for Usage and Output Instructions.  \n",
    "\n",
    "We also require acquaintance with the [marble diagrams](./Marble Diagrams.ipynb) feature of RxPy.\n",
    "\n",
    "<h2 id=\"tocheading\">Table of Contents</h2>\n",
    "<div id=\"toc\"></div>\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# I want an Observable that does not start emitting items to subscribers until asked [publish, publish_value, multicast, let/let_bind](http://reactivex.io/documentation/operators/publish.html)\n",
    "This is basically multicast."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== publish ==========\n",
      "\n",
      "module rx.linq.observable.publish\n",
      "@extensionmethod(Observable)\n",
      "def publish(self, selector=None):\n",
      "    Returns an observable sequence that is the result of invoking the\n",
      "    selector on a connectable observable sequence that shares a single\n",
      "    subscription to the underlying sequence. This operator is a\n",
      "    specialization of Multicast using a regular Subject.\n",
      "\n",
      "    Example:\n",
      "    res = source.publish()\n",
      "    res = source.publish(lambda x: x)\n",
      "\n",
      "    selector -- {Function} [Optional] Selector function which can use the\n",
      "        multicasted source sequence as many times as needed, without causing\n",
      "        multiple subscriptions to the source sequence. Subscribers to the\n",
      "        given source will receive all notifications of the source from the\n",
      "        time of the subscription on.\n",
      "\n",
      "    Returns an observable {Observable} sequence that contains the elements\n",
      "    of a sequence produced by multicasting the source sequence within a\n",
      "    selector function.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "\n",
      "========== Reminder: 2 subscribers on a cold stream: ==========\n",
      "\n",
      "\n",
      "   0.6     M New subscription (81636) on stream 276597469\n",
      "   1.6     M .........EMITTING........\n",
      " 107.1     M [next]  106.4: 78 -> 81636  \n",
      " 107.4     M [cmpl]  106.7: fin -> 81636  \n",
      "\n",
      " 108.3     M New subscription (94168) on stream 276597493\n",
      " 108.9     M .........EMITTING........\n",
      " 214.3     M main thread sleeping 0.4s\n",
      " 314.0    T4 [next]  205.7: 93 -> 94168  \n",
      " 314.2    T4 [cmpl]  205.9: fin -> 94168  \n",
      "\n",
      "\n",
      "========== Now 2 subscribers on a PUBLISHED (hot) stream ==========\n",
      "\n",
      "\n",
      "   0.7     M New subscription (subs1) on stream 276598873\n",
      "\n",
      "   1.7     M New subscription (subs2) on stream 276598929\n",
      "   2.6     M now connect\n",
      "   3.2     M .........EMITTING........\n",
      " 105.0     M [next]  104.3: 66 -> subs1  \n",
      " 106.2     M [cmpl]  105.3: fin -> subs1  \n",
      "\n",
      " 107.8     M New subscription (subs3) on stream 276598873\n",
      " 108.2     M [cmpl]    0.3: fin -> subs3  \n"
     ]
    }
   ],
   "source": [
    "rst(O.publish)\n",
    "    \n",
    "def emit(obs):\n",
    "    log('.........EMITTING........')\n",
    "    sleep(0.1)\n",
    "    obs.on_next(rand())\n",
    "    obs.on_completed()\n",
    "    \n",
    "rst(title='Reminder: 2 subscribers on a cold stream:')    \n",
    "s = O.create(emit)\n",
    "d = subs(s), subs(s.delay(100))\n",
    "\n",
    "\n",
    "rst(title='Now 2 subscribers on a PUBLISHED (hot) stream', sleep=0.4)    \n",
    "sp = s.publish()\n",
    "subs(sp, name='subs1')\n",
    "subs(sp.delay(100), name='subs2')\n",
    "log('now connect')\n",
    "# this creates a 'single, intermediate subscription between stream and subs' \n",
    "d = sp.connect()\n",
    "\n",
    "# will only see the finish, since subscribed too late\n",
    "d = subs(sp, name='subs3')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== publish_value ==========\n",
      "\n",
      "module rx.linq.observable.publishvalue\n",
      "@extensionmethod(Observable)\n",
      "def publish_value(self, initial_value, selector=None):\n",
      "    Returns an observable sequence that is the result of invoking the\n",
      "    selector on a connectable observable sequence that shares a single\n",
      "    subscription to the underlying sequence and starts with initial_value.\n",
      "\n",
      "    This operator is a specialization of Multicast using a BehaviorSubject.\n",
      "\n",
      "    Example:\n",
      "    res = source.publish_value(42)\n",
      "    res = source.publish_value(42, lambda x: x.map(lambda y: y * y))\n",
      "\n",
      "    Keyword arguments:\n",
      "    initial_value -- {Mixed} Initial value received by observers upon\n",
      "        subscription.\n",
      "    selector -- {Function} [Optional] Optional selector function which can\n",
      "        use the multicasted source sequence as many times as needed, without\n",
      "        causing multiple subscriptions to the source sequence. Subscribers\n",
      "        to the given source will receive immediately receive the initial\n",
      "        value, followed by all notifications of the source from the time of\n",
      "        the subscription on.\n",
      "\n",
      "    Returns {Observable} An observable sequence that contains the elements\n",
      "    of a sequence produced by multicasting the source sequence within a\n",
      "    selector function.\n",
      "--------------------------------------------------------------------------------\n",
      "Everybody gets the initial value and the events, sideeffect only once per ev\n",
      "\n",
      "   4.2     M New subscription (81753) on stream 276592833\n",
      "   4.4     M [next]    0.2: 42 -> 81753  \n",
      "\n",
      "   5.0     M New subscription (94051) on stream 276592873\n",
      "  92.0    T5 [next]  209.7: 66 -> subs2  \n",
      "  92.5    T5 [cmpl]  210.1: fin -> subs2  \n",
      " 109.9    T6 [next]  104.7: 42 -> 94051  \n",
      " 508.5    T7 sideffect (0,)\n",
      " 509.1    T7 [next]  504.9: 0 -> 81753  \n",
      " 611.6    T8 [next]  606.4: 0 -> 94051  \n",
      "1012.0    T9 sideffect (1,)\n",
      "1012.6    T9 [next] 1008.4: 1 -> 81753  \n",
      "1117.0   T10 [next] 1111.8: 1 -> 94051  \n",
      "1312.3     M disposing now\n"
     ]
    }
   ],
   "source": [
    "rst(O.publish_value)\n",
    "\n",
    "def sideeffect(*x):\n",
    "    log('sideffect', x)\n",
    "\n",
    "print('Everybody gets the initial value and the events, sideeffect only once per ev')\n",
    "src = O.interval(500).take(20).do_action(sideeffect)\n",
    "published = src.publish_value(42)\n",
    "subs(published), subs(published.delay(100))\n",
    "d = published.connect()\n",
    "sleep(1.3)\n",
    "log('disposing now')\n",
    "d.dispose()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## ... and then only emits the last item in its sequence **[publish_last](http://reactivex.io/documentation/operators/publish.html)**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# not yet in RXPy"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## ... via **[multicast](http://reactivex.io/documentation/operators/publish.html)**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "RxPY also has a **[multicast](http://reactivex.io/documentation/operators/publish.html)** operator which operates on an ordinary Observable, multicasts that Observable by means of a particular Subject that you specify, applies a transformative function to each emission, and then emits those transformed values as its own ordinary Observable sequence.\n",
    "\n",
    "Each subscription to this new Observable will trigger a new subscription to the underlying multicast Observable.  \n",
    "Following the **RXJS** example at [reactive.io docu](http://reactivex.io/documentation/operators/publish.html):\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== multicast ==========\n",
      "\n",
      "module rx.linq.observable.multicast\n",
      "@extensionmethod(Observable)\n",
      "def multicast(self, subject=None, subject_selector=None, selector=None):\n",
      "    Multicasts the source sequence notifications through an instantiated\n",
      "    subject into all uses of the sequence within a selector function. Each\n",
      "    subscription to the resulting sequence causes a separate multicast\n",
      "    invocation, exposing the sequence resulting from the selector function's\n",
      "    invocation. For specializations with fixed subject types, see Publish,\n",
      "    PublishLast, and Replay.\n",
      "\n",
      "    Example:\n",
      "    1 - res = source.multicast(observable)\n",
      "    2 - res = source.multicast(subject_selector=lambda: Subject(),\n",
      "                               selector=lambda x: x)\n",
      "\n",
      "    Keyword arguments:\n",
      "    subject_selector -- {Function} Factory function to create an\n",
      "        intermediate subject through which the source sequence's elements\n",
      "        will be multicast to the selector function.\n",
      "    subject -- Subject {Subject} to push source elements into.\n",
      "    selector -- {Function} [Optional] Optional selector function which can\n",
      "        use the multicasted source sequence subject to the policies enforced\n",
      "        by the created subject. Specified only if subject_selector\" is a\n",
      "        factory function.\n",
      "\n",
      "    Returns an observable {Observable} sequence that contains the elements\n",
      "    of a sequence produced by multicasting the source sequence within a\n",
      "    selector function.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   2.6     M New subscription (17367) on stream 276597497\n",
      "we have now 3 subscriptions, only two will see values.\n",
      "start multicast stream (calling connect):\n",
      "   5.1     M emitting 10\n",
      "   5.2     M [next]    2.3: 10 -> 17367  \n",
      "   5.4     M [next]    2.4: 10 -> 17367  \n",
      "   6.0     M emitting 67\n",
      "   6.5     M [next]    3.6: 67 -> 17367  \n",
      "   6.8     M [next]    3.9: 67 -> 17367  \n",
      "   6.9     M complete\n",
      "   7.0     M [cmpl]    4.1: fin -> 17367  \n",
      "   7.1     M [cmpl]    4.2: fin -> 17367  \n"
     ]
    }
   ],
   "source": [
    "rst(O.multicast)\n",
    "# show actions on intermediate subject:\n",
    "show = False\n",
    "\n",
    "def emit(obs):\n",
    "    'instead of range we allow some logging:'\n",
    "    for i in (1, 2):\n",
    "        v = rand()\n",
    "        log('emitting', v)\n",
    "        obs.on_next(v)\n",
    "    log('complete')\n",
    "    obs.on_completed()\n",
    "    \n",
    "\n",
    "class MySubject:\n",
    "    def __init__(self):\n",
    "        self.rx_subj = Subject()\n",
    "        if show:\n",
    "            log('New Subject %s created' % self)\n",
    "\n",
    "        \n",
    "    def __str__(self):\n",
    "        return str(hash(self))[-4:]\n",
    "    \n",
    "    def __getattr__(self, a):\n",
    "        'called at any attr. access, logging it'\n",
    "        if not a.startswith('__') and show:\n",
    "            log('RX called', a, 'on MySub\\n')\n",
    "        return getattr(self.rx_subj, a)\n",
    "        \n",
    "        \n",
    "subject1 = MySubject()\n",
    "subject2 = MySubject()\n",
    "\n",
    "source = O.create(emit).multicast(subject2)\n",
    "\n",
    "# a \"subscription\" *is* a disposable\n",
    "# (the normal d we return all the time):\n",
    "d, observer  = subs(source, return_subscriber=True)\n",
    "ds1 = subject1.subscribe(observer)\n",
    "ds2 = subject2.subscribe(observer)\n",
    "print ('we have now 3 subscriptions, only two will see values.')\n",
    "print('start multicast stream (calling connect):')\n",
    "connected = source.connect()\n",
    "d.dispose()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 58,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== let_bind ==========\n",
      "\n",
      "module rx.linq.observable.let\n",
      "@extensionmethod(Observable, alias=\"let\")\n",
      "def let_bind(self, func):\n",
      "    Returns an observable sequence that is the result of invoking the\n",
      "    selector on the source sequence, without sharing subscriptions. This\n",
      "    operator allows for a fluent style of writing queries that use the same\n",
      "    sequence multiple times.\n",
      "\n",
      "    selector -- {Function} Selector function which can use the source\n",
      "        sequence as many times as needed, without sharing subscriptions to\n",
      "        the source sequence.\n",
      "\n",
      "    Returns an observable {Observable} sequence that contains the elements\n",
      "    of a sequence produced by multicasting the source sequence within a\n",
      "    selector function.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "\n",
      "========== without let ==========\n",
      "\n",
      "\n",
      "   1.4     M New subscription (46742) on stream 276639557\n",
      "   1.8     M emitting 99\n",
      "   2.0     M [next]    0.4: 99 -> 46742  \n",
      "   2.2     M complete\n",
      "   2.4     M emitting 47\n",
      "   2.6     M [next]    1.1: 47 -> 46742  \n",
      "   2.7     M complete\n",
      "   2.9     M [cmpl]    1.3: fin -> 46742  \n",
      "\n",
      "   3.5     M New subscription (16738) on stream 276625117\n",
      "   3.8     M emitting 62\n",
      "   3.9     M [next]    0.3: 62 -> 16738  \n",
      "   4.0     M complete\n",
      "   4.3     M emitting 100\n",
      "   4.4     M [next]    0.8: 100 -> 16738  \n",
      "   4.5     M complete\n",
      "   4.6     M [cmpl]    1.0: fin -> 16738  \n",
      "\n",
      "\n",
      "========== now with let ==========\n",
      "\n",
      "\n",
      "   5.3     M New subscription (59075) on stream 276641713\n",
      "   5.6     M emitting 27\n",
      "   5.8     M [next]    0.4: 27 -> 59075  \n",
      "   5.8     M complete\n",
      "   5.9     M emitting 86\n",
      "   6.1     M [next]    0.7: 86 -> 59075  \n",
      "   6.3     M complete\n",
      "   6.4     M [cmpl]    1.1: fin -> 59075  \n",
      "\n",
      "   7.4     M New subscription (59039) on stream 276625113\n",
      "   7.7     M emitting 26\n",
      "   7.9     M [next]    0.5: 26 -> 59039  \n",
      "   8.2     M complete\n",
      "   8.6     M emitting 39\n",
      "   8.8     M [next]    1.3: 39 -> 59039  \n",
      "   9.0     M complete\n",
      "   9.3     M [cmpl]    1.8: fin -> 59039  \n"
     ]
    }
   ],
   "source": [
    "rst(O.let)\n",
    "# show actions on intermediate subject:\n",
    "show = True\n",
    "\n",
    "def emit(obs):\n",
    "    'instead of range we allow some logging:'\n",
    "    v = rand()\n",
    "    log('emitting', v)\n",
    "    obs.on_next(v)\n",
    "    log('complete')\n",
    "    obs.on_completed()\n",
    "    \n",
    "source = O.create(emit)\n",
    "\n",
    "# following the RXJS example:\n",
    "header(\"without let\")\n",
    "d = subs(source.concat(source))\n",
    "d = subs(source.concat(source))\n",
    "\n",
    "header(\"now with let\")\n",
    "d = subs(source.let(lambda o: o.concat(o)))\n",
    "d = subs(source.let(lambda o: o.concat(o)))\n",
    "# TODO: Not understood:\n",
    "# \"This operator allows for a fluent style of writing queries that use the same sequence multiple times.\"\n",
    "# ... I can't verify this, the source sequence is not duplicated but called every time like a cold obs."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## ... and then emits the complete sequence, even to those who subscribe after the sequence has begun **[replay](http://reactivex.io/documentation/operators/replay.html)**\n",
    "\n",
    "A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== replay ==========\n",
      "\n",
      "module rx.linq.observable.replay\n",
      "@extensionmethod(Observable)\n",
      "def replay(self, selector, buffer_size=None, window=None, scheduler=None):\n",
      "    Returns an observable sequence that is the result of invoking the\n",
      "    selector on a connectable observable sequence that shares a single\n",
      "    subscription to the underlying sequence replaying notifications subject\n",
      "    to a maximum time length for the replay buffer.\n",
      "\n",
      "    This operator is a specialization of Multicast using a ReplaySubject.\n",
      "\n",
      "    Example:\n",
      "    res = source.replay(buffer_size=3)\n",
      "    res = source.replay(buffer_size=3, window=500)\n",
      "    res = source.replay(None, 3, 500, scheduler)\n",
      "    res = source.replay(lambda x: x.take(6).repeat(), 3, 500, scheduler)\n",
      "\n",
      "    Keyword arguments:\n",
      "    selector -- [Optional] Selector function which can use the multicasted\n",
      "        source sequence as many times as needed, without causing multiple\n",
      "        subscriptions to the source sequence. Subscribers to the given\n",
      "        source will receive all the notifications of the source subject to\n",
      "        the specified replay buffer trimming policy.\n",
      "    buffer_size -- [Optional] Maximum element count of the replay buffer.\n",
      "    window -- [Optional] Maximum time length of the replay buffer.\n",
      "    scheduler -- [Optional] Scheduler where connected observers within the\n",
      "        selector function will be invoked on.\n",
      "\n",
      "    Returns {Observable} An observable sequence that contains the elements\n",
      "    of a sequence produced by multicasting the source sequence within a\n",
      "    selector function.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "\n",
      "========== playing and replaying... ==========\n",
      "\n",
      "\n",
      "   2.8     M New subscription (Replay Subs 1) on stream 276680345\n",
      "   3.6     M modified_stream (take 2)\n",
      "   4.4     M calling connect now...\n",
      "   5.3     M emitting nr 0, value 229 \n",
      "\n",
      "   5.5     M sync sideeffect (0.2s) ('nr 0, value 229',) \n",
      "\n",
      " 207.6     M end sideeffect ('nr 0, value 229',) \n",
      "\n",
      " 409.5     M emitting nr 1, value 981 \n",
      "\n",
      " 409.9     M sync sideeffect (0.2s) ('nr 1, value 981',) \n",
      "\n",
      " 612.9     M end sideeffect ('nr 1, value 981',) \n",
      "\n",
      " 818.6     M emitting nr 2, value 773 \n",
      "\n",
      " 819.5     M sync sideeffect (0.2s) ('nr 2, value 773',) \n",
      "\n",
      "1025.2     M end sideeffect ('nr 2, value 773',) \n",
      "\n",
      "1228.8     M emitting nr 3, value 711 \n",
      "\n",
      "1429.7     M emitting nr 4, value 427 \n",
      "\n",
      "1635.6     M [next] 1632.7: MODIFIED FOR REPLAY: nr 0, value 229 -> Replay Subs 1\n",
      "\n",
      "1635.9     M [next] 1633.1: MODIFIED FOR REPLAY: nr 1, value 981 -> Replay Subs 1\n",
      "\n",
      "1636.3     M [cmpl] 1633.5: fin -> Replay Subs 1\n",
      "\n"
     ]
    }
   ],
   "source": [
    "rst(O.replay)\n",
    "\n",
    "def emit(obs):\n",
    "    'continuous emission'\n",
    "    for i in range(0, 5):\n",
    "        v = 'nr %s, value %s' % (i, rand())\n",
    "        log('emitting', v, '\\n')\n",
    "        obs.on_next(v)\n",
    "        sleep(0.2)    \n",
    "    \n",
    "\n",
    "def sideeffect(*v):\n",
    "    log(\"sync sideeffect (0.2s)\", v, '\\n')\n",
    "    sleep(0.2)\n",
    "    log(\"end sideeffect\", v, '\\n')\n",
    "    \n",
    "\n",
    "def modified_stream(o):\n",
    "    log('modified_stream (take 2)')\n",
    "    return o.map(lambda x: 'MODIFIED FOR REPLAY: %s' % x).take(2)\n",
    "\n",
    "header(\"playing and replaying...\")\n",
    "subject = Subject()\n",
    "cold = O.create(emit).take(3).do_action(sideeffect)\n",
    "\n",
    "assert not getattr(cold, 'connect', None)\n",
    "hot = cold.multicast(subject)\n",
    "connect = hot.connect # present now.\n",
    "\n",
    "#d, observer = subs(hot, return_subscriber=True, name='normal subscriber\\n')\n",
    "#d1 = subject.subscribe(observer)\n",
    "\n",
    "published = hot.replay(modified_stream, 1000, 50000)\n",
    "d2 = subs(published, name='Replay Subs 1\\n')\n",
    "\n",
    "\n",
    "#header(\"replaying again\")\n",
    "#d = subs(published, name='Replay Subs 2\\n')\n",
    "log('calling connect now...')\n",
    "d3 = hot.connect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you apply the Replay operator to an Observable\n",
    "\n",
    "- **before** you convert it into a connectable Observable,\n",
    "- the resulting connectable Observable will always emit the same complete sequence to any future observers,\n",
    "- even those observers that subscribe after the connectable Observable has begun to emit items to other subscribed observers(!)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== True ==========\n",
      "\n",
      "\n",
      "   0.5     M New subscription (Normal) on stream 276498261\n",
      "\n",
      "   1.8     M New subscription (Replayer A) on stream 276487297\n",
      "\n",
      "   4.3     M New subscription (Replayer B) on stream 276487297\n",
      " 105.8  T160 sideeffect 0\n",
      "\n",
      " 106.5  T162 sideeffect 0\n",
      " 106.1  T161 sideeffect 0\n",
      " 106.8  T160 [next]  106.2: 0 -> Normal\n",
      "\n",
      "\n",
      "\n",
      " 108.4  T162 [next]  104.1: marked 0 -> Replayer B\n",
      " 108.1  T161 [next]  105.7: marked 0 -> Replayer A\n",
      "\n",
      "\n",
      " 214.5  T164 sideeffect 1\n",
      " 214.8  T165 sideeffect 1\n",
      " 215.2  T163 sideeffect 1\n",
      "\n",
      "\n",
      "\n",
      " 215.9  T164 [next]  215.3: 1 -> Normal\n",
      " 215.9  T165 [next]  213.5: marked 1 -> Replayer A\n",
      " 216.7  T163 [next]  212.4: marked 1 -> Replayer B\n",
      "\n",
      "\n",
      "\n",
      " 318.1  T166 sideeffect 2\n",
      "\n",
      " 318.7  T166 [next]  316.3: marked 2 -> Replayer A\n",
      "\n",
      " 320.0  T166 [next]  317.7: marked 0 -> Replayer A\n",
      "\n",
      " 320.7  T166 [next]  318.4: marked 1 -> Replayer A\n",
      "\n",
      " 321.4  T166 [next]  319.0: marked 2 -> Replayer A\n",
      "\n",
      " 321.9  T166 [cmpl]  319.5: fin -> Replayer A\n",
      "\n",
      " 323.3  T167 sideeffect 2\n",
      " 323.6  T168 sideeffect 2\n",
      "\n",
      "\n",
      " 325.0  T168 [next]  320.7: marked 2 -> Replayer B\n",
      " 323.6  T167 [next]  323.0: 2 -> Normal\n",
      "\n",
      "\n",
      " 326.4  T168 [next]  322.1: marked 0 -> Replayer B\n",
      " 324.4  T167 [cmpl]  323.8: fin -> Normal\n",
      "\n",
      "\n",
      " 327.0  T168 [next]  322.7: marked 1 -> Replayer B\n",
      "\n",
      " 327.4  T168 [next]  323.1: marked 2 -> Replayer B\n",
      "\n",
      " 327.5  T168 [cmpl]  323.2: fin -> Replayer B\n",
      "\n",
      "\n",
      "\n",
      "========== now with publish - no more sideeffects in the replays ==========\n",
      "\n",
      "\n",
      "\n",
      "========== True ==========\n",
      "\n",
      "\n",
      "   0.2     M New subscription (Normal) on stream 276784605\n",
      "\n",
      "   0.7     M New subscription (Replayer A) on stream 276784601\n",
      "\n",
      "   2.9     M New subscription (Replayer B) on stream 276784601\n",
      " 109.3  T172 sideeffect 0\n",
      "\n",
      " 109.9  T172 [next]  109.6: 0 -> Normal\n",
      "\n",
      " 110.6  T172 [next]  109.7: marked 0 -> Replayer A\n",
      "\n",
      " 111.5  T172 [next]  108.5: marked 0 -> Replayer B\n",
      "\n",
      " 214.4  T173 sideeffect 1\n",
      "\n",
      " 214.7  T173 [next]  214.4: 1 -> Normal\n",
      "\n",
      " 215.1  T173 [next]  214.1: marked 1 -> Replayer A\n",
      "\n",
      " 215.4  T173 [next]  212.4: marked 1 -> Replayer B\n",
      "\n",
      " 321.3  T174 sideeffect 2\n",
      "\n",
      " 321.6  T174 [next]  321.3: 2 -> Normal\n",
      "\n",
      " 322.0  T174 [next]  321.0: marked 2 -> Replayer A\n",
      "\n",
      " 322.9  T174 [next]  321.9: marked 0 -> Replayer A\n",
      "\n",
      " 323.3  T174 [next]  322.3: marked 1 -> Replayer A\n",
      "\n",
      " 323.8  T174 [next]  322.9: marked 2 -> Replayer A\n",
      "\n",
      " 323.9  T174 [cmpl]  322.9: fin -> Replayer A\n",
      "\n",
      " 325.0  T174 [next]  322.0: marked 2 -> Replayer B\n",
      "\n",
      " 326.5  T174 [next]  323.5: marked 0 -> Replayer B\n",
      "\n",
      " 327.0  T174 [next]  324.0: marked 1 -> Replayer B\n",
      "\n",
      " 327.1  T174 [next]  324.1: marked 2 -> Replayer B\n",
      "\n",
      " 327.2  T174 [cmpl]  324.2: fin -> Replayer B\n",
      "\n",
      " 327.9  T174 [cmpl]  327.7: fin -> Normal\n",
      "\n"
     ]
    }
   ],
   "source": [
    "\n",
    "def mark(x):\n",
    "    return 'marked %x' % x\n",
    "def side_effect(x):\n",
    "    log('sideeffect %s\\n' % x)\n",
    "    \n",
    "    \n",
    "for i in 1, 2:\n",
    "    s = O.interval(100).take(3).do_action(side_effect)\n",
    "    if i == 2:\n",
    "        sleep(1)\n",
    "        header(\"now with publish - no more sideeffects in the replays\")\n",
    "        s = s.publish()\n",
    "        \n",
    "    reset_start_time()\n",
    "    published = s.replay(lambda o: o.map(mark).take(3).repeat(2), 3)\n",
    "    \n",
    "    d = subs(s,         name='Normal\\n')\n",
    "    d = subs(published, name='Replayer A\\n')\n",
    "    d = subs(published, name='Replayer B\\n')\n",
    "    if i == 2:\n",
    "        d = s.connect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## ... but I want it to go away once all of its subscribers unsubscribe **[ref_count, share](http://reactivex.io/documentation/operators/refcount.html)**\n",
    "\n",
    "A connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when the Connect operator is applied to it. In this way you can prompt an Observable to begin emitting items at a time of your choosing.\n",
    "\n",
    "The RefCount operator automates the process of connecting to and disconnecting from a connectable Observable. It operates on a connectable Observable and returns an ordinary Observable. When the first observer subscribes to this Observable, RefCount connects to the underlying connectable Observable. RefCount then keeps track of how many other observers subscribe to it and does not disconnect from the underlying connectable Observable until the last observer has done so."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== publish ==========\n",
      "\n",
      "module rx.linq.observable.publish\n",
      "@extensionmethod(Observable)\n",
      "def publish(self, selector=None):\n",
      "    Returns an observable sequence that is the result of invoking the\n",
      "    selector on a connectable observable sequence that shares a single\n",
      "    subscription to the underlying sequence. This operator is a\n",
      "    specialization of Multicast using a regular Subject.\n",
      "\n",
      "    Example:\n",
      "    res = source.publish()\n",
      "    res = source.publish(lambda x: x)\n",
      "\n",
      "    selector -- {Function} [Optional] Selector function which can use the\n",
      "        multicasted source sequence as many times as needed, without causing\n",
      "        multiple subscriptions to the source sequence. Subscribers to the\n",
      "        given source will receive all notifications of the source from the\n",
      "        time of the subscription on.\n",
      "\n",
      "    Returns an observable {Observable} sequence that contains the elements\n",
      "    of a sequence produced by multicasting the source sequence within a\n",
      "    selector function.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   2.2     M New subscription (00656) on stream 276501981\n",
      "\n",
      "   3.8     M New subscription (75193) on stream 276501981\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x107b15390>"
      ]
     },
     "execution_count": 33,
     "metadata": {},
     "output_type": "execute_result"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1006.1  T182 [next] 1003.8: 0 -> 00656\n",
      "1006.6  T182 [next] 1002.8: 0 -> 75193\n",
      "2009.3  T183 [next] 2007.0: 1 -> 00656\n",
      "2010.0  T183 [next] 2006.1: 1 -> 75193\n",
      "2010.4  T183 [cmpl] 2008.1: fin -> 00656\n",
      "2010.6  T183 [cmpl] 2006.8: fin -> 75193\n"
     ]
    }
   ],
   "source": [
    "rst(O.interval(1).publish)\n",
    "publ = O.interval(1000).take(2).publish().ref_count()\n",
    "# be aware about potential race conditions here\n",
    "subs(publ)\n",
    "subs(publ)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== share ==========\n",
      "\n",
      "module rx.linq.observable.publish\n",
      "@extensionmethod(Observable)\n",
      "def share(self):\n",
      "    Share a single subscription among multple observers.\n",
      "\n",
      "    Returns a new Observable that multicasts (shares) the original\n",
      "    Observable. As long as there is at least one Subscriber this\n",
      "    Observable will be subscribed and emitting data. When all\n",
      "    subscribers have unsubscribed it will unsubscribe from the source\n",
      "    Observable.\n",
      "\n",
      "    This is an alias for Observable.publish().ref_count().\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      "   1.3     M New subscription (SourceA) on stream 276514157\n",
      "\n",
      "   2.8     M New subscription (SourceB) on stream 276514157\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x107b455d0>"
      ]
     },
     "execution_count": 41,
     "metadata": {},
     "output_type": "execute_result"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      " 206.9  T190 sideeffect 0\n",
      " 207.4  T190 [next]  205.9: 0 -> SourceA\n",
      " 207.5  T190 [next]  204.5: 0 -> SourceB\n",
      " 410.9  T191 sideeffect 1\n",
      " 411.2  T191 [next]  409.7: 1 -> SourceA\n",
      " 411.3  T191 [next]  408.2: 1 -> SourceB\n",
      " 411.4  T191 [cmpl]  409.8: fin -> SourceA\n",
      " 411.6  T191 [cmpl]  408.5: fin -> SourceB\n"
     ]
    }
   ],
   "source": [
    "rst(O.interval(1).share)\n",
    "def sideffect(v):\n",
    "    log('sideeffect %s\\n' % v)\n",
    "publ = O.interval(200).take(2).do_action(sideeffect).share()\n",
    "\n",
    "'''\n",
    "When the number of observers subscribed to published observable goes from\n",
    "0 to 1, we connect to the underlying observable sequence.\n",
    "published.subscribe(createObserver('SourceA'));\n",
    "When the second subscriber is added, no additional subscriptions are added to the\n",
    "underlying observable sequence. As a result the operations that result in side\n",
    "effects are not repeated per subscriber.\n",
    "\n",
    "'''\n",
    "subs(publ, name='SourceA')\n",
    "subs(publ, name='SourceB')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## ... and then I want to ask it to start **[connect](http://reactivex.io/documentation/operators/connect.html)**\n",
    "\n",
    "You can use the publish operator to convert an ordinary Observable into a ConnectableObservable.\n",
    "\n",
    "Call a ConnectableObservable’s connect method to instruct it to begin emitting the items from its underlying Observable to its Subscribers.\n",
    "\n",
    "<img src=\"./assets/img/publishConnect.png\" width=\"400px\">\n",
    "\n",
    "\n",
    "The connect method returns a Disposable. You can call that Disposable object’s dispose method to instruct the Observable to stop emitting items to its Subscribers.\n",
    "\n",
    "You can also use the connect method to instruct an Observable to begin emitting items (or, to begin generating items that would be emitted) even before any Subscriber has subscribed to it.\n",
    "\n",
    "**In this way you can turn a cold Observable into a hot one.**\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 62,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "\n",
      "========== connect ==========\n",
      "\n",
      "module rx.linq.connectableobservable\n",
      "def connect(self):\n",
      "    Connects the observable.\n",
      "--------------------------------------------------------------------------------\n",
      "\n",
      " 507.1     M New subscription (14637) on stream 276493689\n",
      " 517.2  T193 [next]    9.9: 5 -> 14637\n",
      " 622.8  T193 [next]  115.6: 6 -> 14637\n",
      " 724.9  T193 [next]  217.7: 7 -> 14637\n",
      " 826.5  T193 [next]  319.3: 8 -> 14637\n",
      " 931.7  T193 [next]  424.5: 9 -> 14637\n"
     ]
    }
   ],
   "source": [
    "rst(O.interval(1).publish().connect)\n",
    "published = O.create(emit).publish()\n",
    "\n",
    "def emit(obs):\n",
    "    for i in range(0, 10):\n",
    "        log('emitting', i, obs.__class__.__name__, hash(obs))\n",
    "        # going nowhere\n",
    "        obs.on_next(i)\n",
    "        sleep(0.1)\n",
    "\n",
    "import thread\n",
    "thread.start_new_thread(published.connect, ())\n",
    "sleep(0.5)\n",
    "d = subs(published, scheduler=new_thread_scheduler)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
